Kafka-0.8.2.1 producer&consumer示例

一、github源码

Kafka-0.8.2.1 producer&consumer 示例源码github地址:

https://github.com/shiyueqi/kafka-test

二、Kafka安装与配置

Kafka-0.8.2.1安装与配置

三、Kafka官方示例

Kafka wiki

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

0.8.2版本的producer代码

https://github.com/apache/kafka/blob/0.8.2/examples/src/main/java/kafka/examples/Producer.java

0.8.2版本的consumer代码

https://github.com/apache/kafka/blob/0.8.2/examples/src/main/java/kafka/examples/Consumer.java

0.10.2.0版本的producer代码

https://github.com/apache/kafka/blob/0.10.2/examples/src/main/java/kafka/examples/Producer.java

0.10.2.0版本的consumer代码

https://github.com/apache/kafka/blob/0.10.2/examples/src/main/java/kafka/examples/Consumer.java

四、Kafka-0.8.2.1 producer

maven依赖

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.8.2.1</version>
    </dependency>
</dependencies>

代码

package com.unionpay.kafka.test;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * date: 2017/04/21 14:54.
 * author: Yueqi Shi
 */
public class ProducerMain {
    public  static final String KAFKA_TOPIC = "topic_1";

    private static final String BROKERS_ADDRESS = "172.18.55.21:9092,172.18.55.21:9093";

    private static final int REQUEST_REQUIRED_ACKS = 1;

    public static final String MESSAGE = "message";

    private static final String CLIENT_ID = "producer_test_id";

    private KafkaProducer<String, String> producer;

    public ProducerMain() {
        producer = new KafkaProducer<String, String>(buildProperties());

        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                System.out.print("Kafka producer graceful shutdown.");
                ProducerMain.this.shuntown();
            }
        });
    }

    private Properties buildProperties() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.ACKS_CONFIG, String.valueOf(REQUEST_REQUIRED_ACKS));
        props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);

        return props;
    }

    /**
     * 开始生产msg
     */
    public void run() {
        for (int i = 0; i < 10000; i++) {
            String message = ProducerMain.MESSAGE + "_" + i;

            try {
                producer.send(new ProducerRecord<String, String>(ProducerMain.KAFKA_TOPIC, message)).get();

                //Thread.sleep(1000);
                System.out.println("produce: " + message + " success.");
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println("produce: " + message + " fail.");
            } catch (ExecutionException e) {
                e.printStackTrace();
                System.out.println("produce: " + message + " fail.");
            }
        }
    }

    /**
     * shuntown
     */
    public void shuntown() {
        if (null != producer) {
            producer.close();
            producer = null;
        }
    }

    public static void main(String[] args) {
        new ProducerMain().run();
    }
}

代码说明

  • 需指定topic名称

  • Kafka的broker集群地址

五、Kafka-0.8.2.1 consumer

maven依赖

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.2.1</version>
    </dependency>
</dependencies>

代码

ConsumerExecutorService类,即消费消息后,执行类

package com.unionpay.kafka.test;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

/**
 * date: 2017/04/24 13:53.
 * author: Yueqi Shi
 */
public class ConsumerExecutorService implements Runnable {
    private KafkaStream stream;

    public ConsumerExecutorService(KafkaStream stream) {
        this.stream = stream;
    }

    @Override
    public void run() {
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            String message = new String(it.next().message());
            System.out.println("consume: " + message);
        }
    }
}

ConsumreMain类

package com.unionpay.kafka.test;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * date: 2017/04/21 15:11.
 * author: Yueqi Shi
 */
public class ConsumerMain {
    public static final String KAFKA_TOPIC = "topic_1";

    private static final String ZOOKEEPER_ADDRESS = "172.18.55.21:2181,172.18.55.21:2182";

    private static final int THREADS_NUM = 1;

    private static final String CLIENT_ID = "consumer_test_id";

    private final ConsumerConnector consumerConnector;

    private ExecutorService executor;

    public ConsumerMain() {
        consumerConnector = Consumer.createJavaConsumerConnector(buildProperties());
        executor = Executors.newFixedThreadPool(THREADS_NUM);
    }

    /**
     * 构造连接参数
     * @return
     */
    private ConsumerConfig buildProperties() {
        Properties props = new Properties();
        props.put("zookeeper.connect", ZOOKEEPER_ADDRESS);
        props.put("group.id", "test_group");

        props.put("zookeeper.session.timeout.ms", "10000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        props.put("client.id", CLIENT_ID);
        return new ConsumerConfig(props);
    }

    /**
     * 开始消费
     */
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(KAFKA_TOPIC, THREADS_NUM);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(KAFKA_TOPIC);

        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerExecutorService(stream));
        }
    }

    /**
     * shutdown
     */
    public void shutdown() {
        if (consumerConnector != null) consumerConnector.shutdown();
        if (executor != null) executor.shutdown();
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted during shutdown, exiting uncleanly");
        }
    }


    public static void main(String[] args) {
        new ConsumerMain().run();
    }
}

代码说明

  • 需指定topic
  • 需指定Zookeeper集群地址
  • 需指定group.id。在Kafka的consumer中,是以group来进行消费,一个group中有一个consumer示例消费后,其他不会进行消费。

六、版权声明

转载请注明出处:https://shiyueqi.github.io/2017/04/27/Kafka-0.8.2.1 producer&consumer示例/


Author: Yueqi Shi

Date: 2017-04-27 11:20:00 AM